home *** CD-ROM | disk | FTP | other *** search
/ Cream of the Crop 26 / Cream of the Crop 26.iso / os2 / pvm34b3.zip / pvm34b3 / pvm3 / xep / calc.c < prev    next >
C/C++ Source or Header  |  1997-07-22  |  12KB  |  538 lines

  1.  
  2. static char rcsid[] =
  3.     "$Id: calc.c,v 1.2 1997/07/09 13:56:49 pvmsrc Exp $";
  4.  
  5. /*
  6.  *         PVM version 3.4:  Parallel Virtual Machine System
  7.  *               University of Tennessee, Knoxville TN.
  8.  *           Oak Ridge National Laboratory, Oak Ridge TN.
  9.  *                   Emory University, Atlanta GA.
  10.  *      Authors:  J. J. Dongarra, G. E. Fagg, M. Fischer
  11.  *          G. A. Geist, J. A. Kohl, R. J. Manchek, P. Mucci,
  12.  *         P. M. Papadopoulos, S. L. Scott, and V. S. Sunderam
  13.  *                   (C) 1997 All Rights Reserved
  14.  *
  15.  *                              NOTICE
  16.  *
  17.  * Permission to use, copy, modify, and distribute this software and
  18.  * its documentation for any purpose and without fee is hereby granted
  19.  * provided that the above copyright notice appear in all copies and
  20.  * that both the copyright notice and this permission notice appear in
  21.  * supporting documentation.
  22.  *
  23.  * Neither the Institutions (Emory University, Oak Ridge National
  24.  * Laboratory, and University of Tennessee) nor the Authors make any
  25.  * representations about the suitability of this software for any
  26.  * purpose.  This software is provided ``as is'' without express or
  27.  * implied warranty.
  28.  *
  29.  * PVM version 3 was funded in part by the U.S. Department of Energy,
  30.  * the National Science Foundation and the State of Tennessee.
  31.  */
  32.  
  33. /*
  34.  *    calc.c
  35.  *
  36.  *    Bag of tasks driver for tiled workers.
  37.  *    Manages idle, busy, failed workers and message routing sockets.
  38.  *
  39.  *    Oct 95 Manchek
  40.  */
  41.  
  42.  
  43. #ifdef HASSTDLIB
  44. #include <stdlib.h>
  45. #endif
  46. #include <stdio.h>
  47. #include <math.h>
  48. #include <X11/Xlib.h>
  49. #include <X11/cursorfont.h>
  50. #include <X11/Intrinsic.h>
  51. #include <X11/StringDefs.h>
  52. #include <X11/Shell.h>
  53. #include <X11/Xaw/Label.h>
  54. #include <X11/Xaw/Command.h>
  55. #include <X11/Xaw/Toggle.h>
  56. #include <X11/Xaw/Form.h>
  57. #include <pvm3.h>
  58. #include "../src/bfunc.h"
  59. #include "../src/listmac.h"
  60. #include "myalloc.h"
  61. #include "hostc.h"
  62. #include "imp.h"
  63.  
  64.  
  65. #ifndef    min
  66. #define    min(a,b)    ((a)<(b)?(a):(b))
  67. #endif
  68. #ifndef    max
  69. #define    max(a,b)    ((a)>(b)?(a):(b))
  70. #endif
  71.  
  72.  
  73. #define    JobSend            1            /* send tile to worker */
  74. #define    JobReturn        2            /* get image back from worker */
  75. #define    AddMessage        3            /* hosts were added */
  76. #define    DelMessage        4            /* hosts were deleted */
  77. #define    ExitMessage        5            /* worker exited */
  78. #define    RouteAddTag        6            /* new route socket opened */
  79. #define    RouteDeleteTag    7            /* route socket closed */
  80.  
  81. #define    TILEHEIGHT    10
  82.  
  83. struct worker {
  84.     struct worker *link, *rlink;    /* dll of active or idle */
  85.     int tid;                        /* worker tid */
  86.     struct job *job;                /* number */
  87.     XtInputId route;                /* X id for route socket */
  88. };
  89.  
  90. struct job {
  91.     struct job *link, *rlink;
  92.     int tile;                        /* tile number */
  93. };
  94.  
  95.  
  96. int gotmorehosts();
  97.  
  98.  
  99. /***************
  100.  **  Globals  **
  101.  **           **
  102.  ***************/
  103.  
  104. extern struct canvas imCan;    /* from xep.c */
  105. extern int dobars;            /* from xep.c */
  106. extern char *workerfile;    /* from xep.c */
  107. extern int nworkers;        /* from xep.c */
  108.  
  109. int mytid;                    /* pvm tid */
  110. struct worker *active = 0;    /* active workers */
  111. struct worker *idle = 0;    /* idle workers */
  112. struct job *todo = 0;        /* jobs to be started */
  113. int ntiles = 0;                /* number of tiles in frame */
  114.  
  115.  
  116. pvminit()
  117. {
  118.     if ((mytid = pvm_mytid()) < 0)
  119.         exit(1);
  120.  
  121.     pvm_notify(PvmRouteAdd, RouteAddTag, -1, (int *)0);
  122.     pvm_setopt(PvmRoute, PvmRouteDirect);
  123.  
  124.     host_init(AddMessage, DelMessage, gotmorehosts, (int (*)())0);
  125.  
  126.     idle = TALLOC(1, struct worker, "worker");
  127.     idle->link = idle->rlink = idle;
  128.     idle->tid = 0;
  129.     idle->job = 0;
  130.     active = TALLOC(1, struct worker, "worker");
  131.     active->link = active->rlink = active;
  132.     active->tid = 0;
  133.     active->job = 0;
  134.  
  135.     todo = TALLOC(1, struct job, "job");
  136.     todo->link = todo->rlink = todo;
  137.  
  138.     return 0;
  139. }
  140.  
  141.  
  142. more_workers()
  143. {
  144.     int nh;
  145.     int i;
  146.     struct hostc *hp;
  147.     struct worker *wp;
  148.     int tid;
  149.  
  150.     hp = 0;
  151.     nh = 0;
  152.     while (hp = host_next(hp))
  153.         nh++;
  154.  
  155.     nh -= nworkers;
  156.     if (nh <= 0)
  157.         return 0;
  158.  
  159.     hp = 0;
  160.     while (hp = host_next(hp)) {
  161.         i = 0;
  162.         for (wp = idle->link; wp != idle; wp = wp->link)
  163.             if (pvm_tidtohost(wp->tid) == hp->pvmd_tid) {
  164.                 i = 1;
  165.                 break;
  166.             }
  167.         if (i)
  168.             continue;
  169.         for (wp = active->link; wp != active; wp = wp->link)
  170.             if (pvm_tidtohost(wp->tid) == hp->pvmd_tid) {
  171.                 i = 1;
  172.                 break;
  173.             }
  174.         if (i)
  175.             continue;
  176.         if (pvm_spawn(workerfile, (char**)0, PvmTaskHost, hp->name, 1, &tid)
  177.         < 0) {
  178.             pvm_exit();
  179.             exit(1);
  180.         }
  181.         if (tid > 0) {
  182.             wp = TALLOC(1, struct worker, "worker");
  183.             wp->tid = tid;
  184.             wp->job = 0;
  185.             wp->route = 0;
  186.             LISTPUTBEFORE(idle, wp, link, rlink);
  187.             pvm_notify(PvmTaskExit, ExitMessage, 1, &tid);
  188. #ifdef    DEBUG
  189.             fprintf(stderr, "more_workers() new worker 0x%x\n", tid);
  190. #endif
  191.             nworkers++;
  192.         }
  193.     }
  194.  
  195.     setlabel();
  196.     return 0;
  197. }
  198.  
  199.  
  200. stop_workers()
  201. {
  202.     struct worker *wp;
  203.  
  204.     while ((wp = idle->link) != idle) {
  205. #ifdef    DEBUG
  206.         fprintf(stderr, "stop_workers() killing 0x%x\n", wp->tid);
  207. #endif
  208.         pvm_kill(wp->tid);
  209.         LISTDELETE(wp, link, rlink);
  210.         if (wp->route) {
  211. /*
  212.             fprintf(stderr, "removeaninput() xii %ld\n", wp->route);
  213. */
  214.             XtRemoveInput(wp->route);
  215.             wp->route = 0;
  216.         }
  217.         MY_FREE(wp);
  218.     }
  219.     while ((wp = active->link) != active) {
  220. #ifdef    DEBUG
  221.         fprintf(stderr, "stop_workers() killing 0x%x\n", wp->tid);
  222. #endif
  223.         pvm_kill(wp->tid);
  224.         LISTDELETE(wp, link, rlink);
  225.         if (wp->route) {
  226. /*
  227.             fprintf(stderr, "removeaninput() xii %ld\n", wp->route);
  228. */
  229.             XtRemoveInput(wp->route);
  230.             wp->route = 0;
  231.         }
  232.         MY_FREE(wp);
  233.     }
  234.     nworkers = 0;
  235.     setlabel();
  236.     return 0;
  237. }
  238.  
  239.  
  240. do_recalc()
  241. {
  242.     struct worker *wp;
  243.     struct job *jp;
  244.     int ht = imCan.cn_ht;
  245.     int i;
  246.  
  247.     /* toss anything that's already on the todo list */
  248.  
  249.     while ((jp = todo->link) != todo) {
  250.         LISTDELETE(jp, link, rlink);
  251.         MY_FREE(jp);
  252.     }
  253.  
  254.     /* scrub in-progress jobs */
  255.  
  256.     for (wp = active; (wp = wp->link) != active; )
  257.         wp->job->tile = -1;
  258.  
  259.     /* fill it with new tiles */
  260.  
  261.     ntiles = ht / TILEHEIGHT + 1;
  262.  
  263.     for (i = 0; i < ntiles; i += 8) {
  264.         jp = TALLOC(1, struct job, "job");
  265.         jp->tile = i;
  266.         LISTPUTBEFORE(todo, jp, link, rlink);
  267.     }
  268.     for (i = 4; i < ntiles; i += 8) {
  269.         jp = TALLOC(1, struct job, "job");
  270.         jp->tile = i;
  271.         LISTPUTBEFORE(todo, jp, link, rlink);
  272.     }
  273.     for (i = 2; i < ntiles; i += 4) {
  274.         jp = TALLOC(1, struct job, "job");
  275.         jp->tile = i;
  276.         LISTPUTBEFORE(todo, jp, link, rlink);
  277.     }
  278.     for (i = 1; i < ntiles; i += 2) {
  279.         jp = TALLOC(1, struct job, "job");
  280.         jp->tile = i;
  281.         LISTPUTBEFORE(todo, jp, link, rlink);
  282.     }
  283.  
  284.     assign_work();
  285.     return 0;
  286. }
  287.  
  288.  
  289. /*    assign_work()
  290. *
  291. *    Send off tiles to idle workers.
  292. */
  293.  
  294. assign_work()
  295. {
  296.     struct job *jp;
  297.     struct worker *wp;
  298.     double im1 = imCan.cn_im1;
  299.     double im2 = imCan.cn_im2 - im1;
  300.     int ht = imCan.cn_ht;
  301.     int wd = imCan.cn_wd;
  302.     double reim[4];                    /* tile corner coords */
  303.     int wdht[2];                    /* tile wd, ht */
  304.     int y1, y2;
  305.  
  306.     reim[0] = imCan.cn_re1;
  307.     reim[2] = imCan.cn_re2;
  308.     wdht[0] = wd;
  309.  
  310.     while (idle->link != idle && todo->link != todo) {
  311.         jp = todo->link;
  312.         LISTDELETE(jp, link, rlink);
  313.         y2 = ((jp->tile + 1) * ht) / ntiles;
  314.         y1 = (jp->tile * ht) / ntiles;
  315.         reim[1] = im1 + (y1 * im2) / ht;
  316.         reim[3] = im1 + (y2 * im2) / ht;
  317.         wdht[1] = y2 - y1;
  318.         wp = idle->link;
  319.         LISTDELETE(wp, link, rlink);
  320.         LISTPUTBEFORE(active, wp, link, rlink);
  321.         wp->job = jp;
  322. #ifdef    DEBUG
  323.         fprintf(stderr, "sent job %d to 0x%x: %dx%d %f,%f/%f,%f\n",
  324.                 jp->tile, wp->tid,
  325.                 wdht[0], wdht[1], reim[0], reim[1], reim[2], reim[3]);
  326. #endif
  327.         pvm_packf("%+ %2lx %2d", PvmDataDefault, reim, wdht);
  328.         if (pvm_send(wp->tid, JobSend)) {
  329.             pvm_exit();
  330.             exit(1);
  331.         }
  332.  
  333.         if (dobars)
  334.             label_row(y1, y2, wp->tid);
  335.     }
  336.  
  337.     /*
  338.     * we must check receive here because messages may have arrived
  339.     * while we were sending, so the socket will not be ready to
  340.     * read to wake us up.  a bit sick.
  341.     */
  342.  
  343.     while (pvm_nrecv(-1, -1) > 0)
  344.         claim_message();
  345.     return 0;
  346. }
  347.  
  348.  
  349. claim_message()
  350. {
  351.     struct job *jp;
  352.     struct worker *wp;
  353.     int tag;
  354.     int tid;        /* id of sender */
  355.     int wd = imCan.cn_wd;
  356.     int ht = imCan.cn_ht;
  357.     double im1 = imCan.cn_im1;
  358.     double im2 = imCan.cn_im2 - im1;
  359.     double reim[4];                    /* tile corner coords */
  360.     int wdht[2];                    /* tile wd, ht */
  361.     int y1, y2;                        /* tile start, end rows */
  362.     int h;                            /* height of tile */
  363.  
  364.     pvm_bufinfo(pvm_getrbuf(), (int*)0, &tag, &tid);
  365. #ifdef    DEBUG
  366.     fprintf(stderr, "message %d from 0x%x\n", tag, tid);
  367. #endif
  368.  
  369.     if (tag == AddMessage) {
  370.         host_add();
  371.  
  372.     } else if (tag == DelMessage) {
  373.         host_delete();
  374.  
  375.     } else if (tag == ExitMessage) {
  376.         pvm_upkint(&tid, 1, 1);
  377.         for (wp = idle->link; wp != idle; wp = wp->link) {
  378.             if (wp->tid == tid) {
  379.                 LISTDELETE(wp, link, rlink);
  380.                 if (wp->route) {
  381. /*
  382.                     fprintf(stderr, "removeaninput() xii %ld\n", wp->route);
  383. */
  384.                     XtRemoveInput(wp->route);
  385.                     wp->route = 0;
  386.                 }
  387.                 MY_FREE(wp);
  388.                 nworkers--;
  389.                 setlabel();
  390.                 goto tryagain;
  391.             }
  392.         }
  393.         for (wp = active->link; wp != active; wp = wp->link) {
  394.             if (wp->tid == tid) {
  395.                 int i;
  396.  
  397.                 LISTDELETE(wp, link, rlink);
  398.                 jp = wp->job;
  399.                 if (jp->tile >= 0) {
  400.                     LISTPUTAFTER(todo, jp, link, rlink);
  401.                     y1 = (jp->tile * ht) / ntiles;
  402.                     y2 = ((jp->tile + 1) * ht) / ntiles;
  403.                     h = y2 - y1;
  404.                     for (i = wd * h; i-- > 0; )
  405.                         *((char*)imCan.cn_dat + y1 * wd + i) = 32;
  406. /*
  407.                     BZERO((char*)imCan.cn_dat + y1 * wd, wd * h);
  408. */
  409.                     repaint_region(&imCan, 0, y1, wd - 1, y2 - 1);
  410.                     refresh_region(&imCan, 0, y1, wd - 1, y2 - 1);
  411.                 } else {
  412.                     MY_FREE(jp);
  413.                 }
  414.                 if (wp->route) {
  415. /*
  416.                     fprintf(stderr, "removeaninput() xii %ld\n", wp->route);
  417. */
  418.                     XtRemoveInput(wp->route);
  419.                     wp->route = 0;
  420.                 }
  421.                 MY_FREE(wp);
  422.                 nworkers--;
  423.                 setlabel();
  424. /*
  425.                 assign_work();
  426. */
  427.                 goto tryagain;
  428.             }
  429.         }
  430.  
  431. tryagain:
  432.         gotmorehosts();        /* hope we can start another */
  433.  
  434.     } else if (tag == JobReturn) {
  435.         for (wp = active->link; wp != active; wp = wp->link)
  436.             if (wp->tid == tid)
  437.                 break;
  438.         if (wp == active) {
  439.             fprintf(stderr, "bogus message?\n");
  440.             return;
  441.         }
  442.         LISTDELETE(wp, link, rlink);
  443.         jp = wp->job;
  444.  
  445.         if (jp->tile >= 0) {
  446. #ifdef    DEBUG
  447.             fprintf(stderr, "got job %d from 0x%x\n", jp->tile, wp->tid);
  448. #endif
  449.             y1 = (jp->tile * ht) / ntiles;
  450.             y2 = ((jp->tile + 1) * ht) / ntiles;
  451.             h = y2 - y1;
  452.             pvm_unpackf("%*c", wd * h, (char*)imCan.cn_dat + y1 * wd);
  453.  
  454.             if (dobars)
  455.                 label_row(y1, y2, wp->tid);
  456.  
  457.             repaint_region(&imCan, 0, y1, wd - 1, y2 - 1);
  458.             refresh_region(&imCan, 0, y1, wd - 1, y2 - 1);
  459.         }
  460.  
  461.         MY_FREE(jp);
  462.  
  463.         /* put work server back on free list */
  464.  
  465.         LISTPUTBEFORE(idle, wp, link, rlink);
  466.  
  467.         /* assign more work if available */
  468.  
  469.         assign_work();
  470.  
  471.     } else if (tag == RouteAddTag) {
  472.         int tid, fd;
  473.         XtInputId xii;
  474.  
  475.         pvm_unpackf("%d %d", &tid, &fd);
  476.         fprintf(stderr, "got route add notify tid 0x%x fd %d\n", tid, fd);
  477.         for (wp = idle->link; wp != idle; wp = wp->link)
  478.             if (wp->tid == tid)
  479.                 break;
  480.         if (wp == idle)
  481.             for (wp = active->link; wp != active; wp = wp->link)
  482.                 if (wp->tid == tid)
  483.                     break;
  484.         if (wp == active) {
  485.             fprintf(stderr, "route add notify for worker not mine tid 0x%x\n",
  486.                     tid);
  487.  
  488.         } else {
  489.             addaninputfile(fd, &wp->route);
  490.             pvm_notify(PvmRouteDelete, RouteDeleteTag, 1, &tid);
  491.         }
  492.  
  493.     } else if (tag == RouteDeleteTag) {
  494.         int tid, fd;
  495.         XtInputId xii;
  496.  
  497.         pvm_unpackf("%d %d", &tid, &fd);
  498.         fprintf(stderr, "got route delete notify tid 0x%x fd %d\n", tid, fd);
  499.         for (wp = idle->link; wp != idle; wp = wp->link)
  500.             if (wp->tid == tid)
  501.                 break;
  502.         if (wp == idle)
  503.             for (wp = active->link; wp != active; wp = wp->link)
  504.                 if (wp->tid == tid)
  505.                     break;
  506.         if (wp == active) {
  507.             fprintf(stderr,
  508.                     "route delete notify for worker not mine tid 0x%x\n", tid);
  509.  
  510.         } else if (wp->route) {
  511. /*
  512.             removeaninputfile(wp->route);
  513. */
  514. /*
  515.             fprintf(stderr, "removeaninput() xii %ld\n", wp->route);
  516. */
  517.             XtRemoveInput(wp->route);
  518.             wp->route = 0;
  519.         }
  520.     }
  521.  
  522.     return 0;
  523. }
  524.  
  525.  
  526. int
  527. gotmorehosts()
  528. {
  529. #ifdef    DEBUG
  530.     fprintf(stderr, "host added\n");
  531. #endif
  532.     more_workers();
  533.     assign_work();
  534.     return 0;
  535. }
  536.  
  537.  
  538.